package com.rabbitmq.spring.api;

import com.rabbitmq.client.BuiltinExchangeType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

import static com.rabbitmq.common.constants.CommonConstants.*;


/**
 * @author
 * @Describe headers交换机
 * @date
 */
@Component
@Slf4j(topic = "SpringConsumerHeaders")
public class SpringConsumerHeaders {

    /**
     * 声明Queue
     *
     * @return
     */
    @Bean
    public Queue publishHeadersAll() {
        Queue queue = new Queue(SPRING_QUEUE_NAME_PREFIX + "publishHeadersAll");
        return queue;
    }

    /**
     * 声明Queue
     *
     * @return
     */
    @Bean
    public Queue publishHeadersAny() {
        Queue queue = new Queue(SPRING_QUEUE_NAME_PREFIX + "publishHeadersAny");
        return queue;
    }

    /**
     * 声明headers交换机
     *
     * @return
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(SPRING_EXCHANGE_NAME_PREFIX + BuiltinExchangeType.HEADERS.getType() + "_publishHeaders");
    }

    /**
     * 全部匹配
     *
     * @return
     */
    @Bean
    public Binding bindHeadersAll(Queue publishHeadersAll, HeadersExchange headersExchange) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("1", 1);
        headers.put("2", 2);
        headers.put("3", 3);
        // 只有当发送消息的headers为headers内容时能接收
        return BindingBuilder.bind(publishHeadersAll).to(headersExchange).whereAll(headers).match();
    }

    /**
     * 任意匹配
     *
     * @return
     */
    @Bean
    public Binding bindHeadersAny(Queue publishHeadersAny, HeadersExchange headersExchange) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("1", 11);
        headers.put("2", 2);
        // 当发送消息的headers为包含headers内容任意一个时就能接收
        return BindingBuilder.bind(publishHeadersAny).to(headersExchange).whereAny(headers).match();
    }

    @RabbitListener(queues = SPRING_QUEUE_NAME_PREFIX + "publishHeadersAll")
    void publishHeadersAll(Message message) throws UnsupportedEncodingException {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        log.info(" ===== publishHeadersAll msg is {} ===== ", new String(message.getBody(), StringUtils.isEmpty(contentType) ? UTF_8 : contentType) +
                ",headers is " + messageProperties.getHeaders());
    }

    @RabbitListener(queues = SPRING_QUEUE_NAME_PREFIX + "publishHeadersAny")
    void publishHeadersAny(Message message) throws UnsupportedEncodingException {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        log.info(" ===== publishHeadersAny msg is {} ===== ", new String(message.getBody(), StringUtils.isEmpty(contentType) ? UTF_8 : contentType) +
                ",headers is " + messageProperties.getHeaders());
    }
}
